1 package org.apache.lucene.replicator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.io.PrintStream;
24 import java.nio.file.Path;
25 import java.util.HashMap;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.lucene.document.Document;
31 import org.apache.lucene.facet.DrillDownQuery;
32 import org.apache.lucene.facet.FacetField;
33 import org.apache.lucene.facet.Facets;
34 import org.apache.lucene.facet.FacetsCollector;
35 import org.apache.lucene.facet.FacetsConfig;
36 import org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts;
37 import org.apache.lucene.facet.taxonomy.TaxonomyReader;
38 import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
39 import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyReader;
40 import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter;
41 import org.apache.lucene.index.CheckIndex;
42 import org.apache.lucene.index.DirectoryReader;
43 import org.apache.lucene.index.IndexWriter;
44 import org.apache.lucene.index.IndexWriterConfig;
45 import org.apache.lucene.index.SegmentInfos;
46 import org.apache.lucene.index.SnapshotDeletionPolicy;
47 import org.apache.lucene.replicator.IndexAndTaxonomyRevision.SnapshotDirectoryTaxonomyWriter;
48 import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
49 import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
50 import org.apache.lucene.search.IndexSearcher;
51 import org.apache.lucene.search.MatchAllDocsQuery;
52 import org.apache.lucene.search.TopDocs;
53 import org.apache.lucene.store.Directory;
54 import org.apache.lucene.store.MockDirectoryWrapper;
55 import org.apache.lucene.util.IOUtils;
56 import org.apache.lucene.util.TestUtil;
57 import org.apache.lucene.util.ThreadInterruptedException;
58 import org.junit.After;
59 import org.junit.Before;
60 import org.junit.Test;
61
62 public class IndexAndTaxonomyReplicationClientTest extends ReplicatorTestCase {
63
64 private static class IndexAndTaxonomyReadyCallback implements Callable<Boolean>, Closeable {
65
66 private final Directory indexDir, taxoDir;
67 private DirectoryReader indexReader;
68 private DirectoryTaxonomyReader taxoReader;
69 private FacetsConfig config;
70 private long lastIndexGeneration = -1;
71
72 public IndexAndTaxonomyReadyCallback(Directory indexDir, Directory taxoDir) throws IOException {
73 this.indexDir = indexDir;
74 this.taxoDir = taxoDir;
75 config = new FacetsConfig();
76 config.setHierarchical("A", true);
77 if (DirectoryReader.indexExists(indexDir)) {
78 indexReader = DirectoryReader.open(indexDir);
79 lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
80 taxoReader = new DirectoryTaxonomyReader(taxoDir);
81 }
82 }
83
84 @Override
85 public Boolean call() throws Exception {
86 if (indexReader == null) {
87 indexReader = DirectoryReader.open(indexDir);
88 lastIndexGeneration = indexReader.getIndexCommit().getGeneration();
89 taxoReader = new DirectoryTaxonomyReader(taxoDir);
90 } else {
91
92 DirectoryReader newReader = DirectoryReader.openIfChanged(indexReader);
93 assertNotNull("should not have reached here if no changes were made to the index", newReader);
94 long newGeneration = newReader.getIndexCommit().getGeneration();
95 assertTrue("expected newer generation; current=" + lastIndexGeneration + " new=" + newGeneration, newGeneration > lastIndexGeneration);
96 indexReader.close();
97 indexReader = newReader;
98 lastIndexGeneration = newGeneration;
99 TestUtil.checkIndex(indexDir);
100
101
102 DirectoryTaxonomyReader newTaxoReader = TaxonomyReader.openIfChanged(taxoReader);
103 if (newTaxoReader != null) {
104 taxoReader.close();
105 taxoReader = newTaxoReader;
106 }
107 TestUtil.checkIndex(taxoDir);
108
109
110 int id = Integer.parseInt(indexReader.getIndexCommit().getUserData().get(VERSION_ID), 16);
111 IndexSearcher searcher = new IndexSearcher(indexReader);
112 FacetsCollector fc = new FacetsCollector();
113 searcher.search(new MatchAllDocsQuery(), fc);
114 Facets facets = new FastTaxonomyFacetCounts(taxoReader, config, fc);
115 assertEquals(1, facets.getSpecificValue("A", Integer.toString(id, 16)).intValue());
116
117 DrillDownQuery drillDown = new DrillDownQuery(config);
118 drillDown.add("A", Integer.toString(id, 16));
119 TopDocs docs = searcher.search(drillDown, 10);
120 assertEquals(1, docs.totalHits);
121 }
122 return null;
123 }
124
125 @Override
126 public void close() throws IOException {
127 IOUtils.close(indexReader, taxoReader);
128 }
129 }
130
131 private Directory publishIndexDir, publishTaxoDir;
132 private MockDirectoryWrapper handlerIndexDir, handlerTaxoDir;
133 private Replicator replicator;
134 private SourceDirectoryFactory sourceDirFactory;
135 private ReplicationClient client;
136 private ReplicationHandler handler;
137 private IndexWriter publishIndexWriter;
138 private SnapshotDirectoryTaxonomyWriter publishTaxoWriter;
139 private FacetsConfig config;
140 private IndexAndTaxonomyReadyCallback callback;
141 private Path clientWorkDir;
142
143 private static final String VERSION_ID = "version";
144
145 private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
146
147
148
149 while (client.isUpdateThreadAlive()) {
150
151 try {
152 Thread.sleep(100);
153 } catch (InterruptedException e) {
154 throw new ThreadInterruptedException(e);
155 }
156
157 try {
158 DirectoryReader reader = DirectoryReader.open(dir);
159 try {
160 int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
161 if (expectedID == handlerID) {
162 return;
163 }
164 } finally {
165 reader.close();
166 }
167 } catch (Exception e) {
168
169
170
171 }
172 }
173 }
174
175 private Revision createRevision(final int id) throws IOException {
176 publishIndexWriter.addDocument(newDocument(publishTaxoWriter, id));
177 publishIndexWriter.setCommitData(new HashMap<String, String>() {{
178 put(VERSION_ID, Integer.toString(id, 16));
179 }});
180 publishIndexWriter.commit();
181 publishTaxoWriter.commit();
182 return new IndexAndTaxonomyRevision(publishIndexWriter, publishTaxoWriter);
183 }
184
185 private Document newDocument(TaxonomyWriter taxoWriter, int id) throws IOException {
186 Document doc = new Document();
187 doc.add(new FacetField("A", Integer.toString(id, 16)));
188 return config.build(taxoWriter, doc);
189 }
190
191 @Override
192 @Before
193 public void setUp() throws Exception {
194 super.setUp();
195 publishIndexDir = newDirectory();
196 publishTaxoDir = newDirectory();
197 handlerIndexDir = newMockDirectory();
198 handlerTaxoDir = newMockDirectory();
199 clientWorkDir = createTempDir("replicationClientTest");
200 sourceDirFactory = new PerSessionDirectoryFactory(clientWorkDir);
201 replicator = new LocalReplicator();
202 callback = new IndexAndTaxonomyReadyCallback(handlerIndexDir, handlerTaxoDir);
203 handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, callback);
204 client = new ReplicationClient(replicator, handler, sourceDirFactory);
205
206 IndexWriterConfig conf = newIndexWriterConfig(null);
207 conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
208 publishIndexWriter = new IndexWriter(publishIndexDir, conf);
209 publishTaxoWriter = new SnapshotDirectoryTaxonomyWriter(publishTaxoDir);
210 config = new FacetsConfig();
211 config.setHierarchical("A", true);
212 }
213
214 @After
215 @Override
216 public void tearDown() throws Exception {
217 publishIndexWriter.close();
218 IOUtils.close(client, callback, publishTaxoWriter, replicator, publishIndexDir, publishTaxoDir,
219 handlerIndexDir, handlerTaxoDir);
220 super.tearDown();
221 }
222
223 @Test
224 public void testNoUpdateThread() throws Exception {
225 assertNull("no version expected at start", handler.currentVersion());
226
227
228 replicator.publish(createRevision(1));
229 client.updateNow();
230
231
232 client.updateNow();
233
234 replicator.publish(createRevision(2));
235 client.updateNow();
236
237
238 replicator.publish(createRevision(3));
239 replicator.publish(createRevision(4));
240 client.updateNow();
241 }
242
243 @Test
244 public void testRestart() throws Exception {
245 replicator.publish(createRevision(1));
246 client.updateNow();
247
248 replicator.publish(createRevision(2));
249 client.updateNow();
250
251 client.stopUpdateThread();
252 client.close();
253 client = new ReplicationClient(replicator, handler, sourceDirFactory);
254
255
256 replicator.publish(createRevision(3));
257 replicator.publish(createRevision(4));
258 client.updateNow();
259 }
260
261 @Test
262 public void testUpdateThread() throws Exception {
263 client.startUpdateThread(10, "indexTaxo");
264
265 replicator.publish(createRevision(1));
266 assertHandlerRevision(1, handlerIndexDir);
267
268 replicator.publish(createRevision(2));
269 assertHandlerRevision(2, handlerIndexDir);
270
271
272 replicator.publish(createRevision(3));
273 replicator.publish(createRevision(4));
274 assertHandlerRevision(4, handlerIndexDir);
275 }
276
277 @Test
278 public void testRecreateTaxonomy() throws Exception {
279 replicator.publish(createRevision(1));
280 client.updateNow();
281
282
283 Directory newTaxo = newDirectory();
284 new DirectoryTaxonomyWriter(newTaxo).close();
285 publishTaxoWriter.replaceTaxonomy(newTaxo);
286 publishIndexWriter.deleteAll();
287 replicator.publish(createRevision(2));
288
289 client.updateNow();
290 newTaxo.close();
291 }
292
293
294
295
296
297
298
299 @Test
300 public void testConsistencyOnExceptions() throws Exception {
301
302 replicator.publish(createRevision(1));
303 client.updateNow();
304 client.close();
305 callback.close();
306
307
308
309
310
311
312
313
314
315
316 handlerIndexDir.setPreventDoubleWrite(false);
317 handlerTaxoDir.setPreventDoubleWrite(false);
318
319
320 final SourceDirectoryFactory in = sourceDirFactory;
321 final AtomicInteger failures = new AtomicInteger(atLeast(10));
322 sourceDirFactory = new SourceDirectoryFactory() {
323
324 private long clientMaxSize = 100, handlerIndexMaxSize = 100, handlerTaxoMaxSize = 100;
325 private double clientExRate = 1.0, handlerIndexExRate = 1.0, handlerTaxoExRate = 1.0;
326
327 @Override
328 public void cleanupSession(String sessionID) throws IOException {
329 in.cleanupSession(sessionID);
330 }
331
332 @SuppressWarnings("synthetic-access")
333 @Override
334 public Directory getDirectory(String sessionID, String source) throws IOException {
335 Directory dir = in.getDirectory(sessionID, source);
336 if (random().nextBoolean() && failures.get() > 0) {
337 MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
338 mdw.setRandomIOExceptionRateOnOpen(clientExRate);
339 mdw.setMaxSizeInBytes(clientMaxSize);
340 mdw.setRandomIOExceptionRate(clientExRate);
341 mdw.setCheckIndexOnClose(false);
342 clientMaxSize *= 2;
343 clientExRate /= 2;
344 return mdw;
345 }
346
347 if (failures.get() > 0 && random().nextBoolean()) {
348 if (random().nextBoolean()) {
349 handlerIndexDir.setMaxSizeInBytes(handlerIndexMaxSize);
350 handlerIndexDir.setRandomIOExceptionRate(handlerIndexExRate);
351 handlerIndexDir.setRandomIOExceptionRateOnOpen(handlerIndexExRate);
352 handlerIndexMaxSize *= 2;
353 handlerIndexExRate /= 2;
354 } else {
355 handlerTaxoDir.setMaxSizeInBytes(handlerTaxoMaxSize);
356 handlerTaxoDir.setRandomIOExceptionRate(handlerTaxoExRate);
357 handlerTaxoDir.setRandomIOExceptionRateOnOpen(handlerTaxoExRate);
358 handlerTaxoDir.setCheckIndexOnClose(false);
359 handlerTaxoMaxSize *= 2;
360 handlerTaxoExRate /= 2;
361 }
362 } else {
363
364 handlerIndexDir.setMaxSizeInBytes(0);
365 handlerIndexDir.setRandomIOExceptionRate(0.0);
366 handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
367 handlerTaxoDir.setMaxSizeInBytes(0);
368 handlerTaxoDir.setRandomIOExceptionRate(0.0);
369 handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
370 }
371
372 return dir;
373 }
374 };
375
376 handler = new IndexAndTaxonomyReplicationHandler(handlerIndexDir, handlerTaxoDir, new Callable<Boolean>() {
377 @Override
378 public Boolean call() throws Exception {
379 if (random().nextDouble() < 0.2 && failures.get() > 0) {
380 throw new RuntimeException("random exception from callback");
381 }
382 return null;
383 }
384 });
385
386 final AtomicBoolean failed = new AtomicBoolean();
387
388
389 client = new ReplicationClient(replicator, handler, sourceDirFactory) {
390 @SuppressWarnings("synthetic-access")
391 @Override
392 protected void handleUpdateException(Throwable t) {
393 if (t instanceof IOException) {
394 try {
395 if (VERBOSE) {
396 System.out.println("hit exception during update: " + t);
397 t.printStackTrace(System.out);
398 }
399
400
401 DirectoryReader reader = DirectoryReader.open(handlerIndexDir.getDelegate());
402 try {
403 int numDocs = reader.numDocs();
404 int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
405 assertEquals(numDocs, version);
406 } finally {
407 reader.close();
408 }
409
410 TestUtil.checkIndex(handlerIndexDir.getDelegate());
411
412
413
414 ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
415 CheckIndex.Status indexStatus = null;
416
417 try (CheckIndex checker = new CheckIndex(handlerTaxoDir.getDelegate())) {
418 checker.setFailFast(true);
419 checker.setInfoStream(new PrintStream(bos, false, IOUtils.UTF_8), false);
420 try {
421 indexStatus = checker.checkIndex(null);
422 } catch (IOException | RuntimeException ioe) {
423
424 }
425 }
426
427 if (indexStatus == null || indexStatus.clean == false) {
428
429
430
431
432
433
434
435
436
437
438 String segmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(handlerTaxoDir);
439 assertTrue(handlerTaxoDir.didTryToDelete(segmentsFileName));
440 handlerTaxoDir.getDelegate().deleteFile(segmentsFileName);
441 TestUtil.checkIndex(handlerTaxoDir.getDelegate());
442 }
443 } catch (IOException e) {
444 failed.set(true);
445 throw new RuntimeException(e);
446 } catch (RuntimeException e) {
447 failed.set(true);
448 throw e;
449 } finally {
450
451 failures.decrementAndGet();
452 assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
453 if (VERBOSE) {
454 if (failures.get() == 0) {
455 System.out.println("no more failures expected");
456 } else {
457 System.out.println("num failures left: " + failures.get());
458 }
459 }
460 }
461 } else {
462 failed.set(true);
463 if (t instanceof RuntimeException) {
464 throw (RuntimeException) t;
465 }
466 throw new RuntimeException(t);
467 }
468 }
469 };
470
471 client.startUpdateThread(10, "indexAndTaxo");
472
473 final Directory baseHandlerIndexDir = handlerIndexDir.getDelegate();
474 int numRevisions = atLeast(20) + 2;
475 for (int i = 2; i < numRevisions && failed.get() == false; i++) {
476 replicator.publish(createRevision(i));
477 assertHandlerRevision(i, baseHandlerIndexDir);
478 }
479
480
481
482 handlerIndexDir.setMaxSizeInBytes(0);
483 handlerIndexDir.setRandomIOExceptionRate(0.0);
484 handlerIndexDir.setRandomIOExceptionRateOnOpen(0.0);
485 handlerTaxoDir.setMaxSizeInBytes(0);
486 handlerTaxoDir.setRandomIOExceptionRate(0.0);
487 handlerTaxoDir.setRandomIOExceptionRateOnOpen(0.0);
488 }
489
490 }